通常來說,對於允許併發多執行分支的內核或引擎來說,都需要提供對應的通信機制和同步機制。
例如,多進程之間,有進程間通信方式,比如管道、套接字、共享內存、消息隊列等,還有進程間的同步機制,例如信號量、文件鎖、條件變量等。多線程之間,也有線程間通信方式,簡單粗暴的是直接共享同進程內存,同步機制則有互斥鎖、條件變量等。
tokio提供了異步多任務的併發能力,它也需要提供異步任務之間的通信方式和同步機制。
分為底下幾種通道
就像告知狀態,oneshot是一對一的通知
範例如下:
use tokio::sync::oneshot;
use std::time::Duration;
#[tokio::main]
async fn main() {
let (tx, rx) = oneshot::channel();
// 撮合引擎 task 等待風控訊號
tokio::spawn(async move {
println!("撮合引擎等待風控通知...");
match rx.await {
Ok(msg) => println!("撮合引擎收到風控通知: {}", msg),
Err(_) => println!("風控通道被關閉,無法收到通知"),
}
});
// 模擬外部風控邏輯
tokio::spawn(async move {
// 模擬風控檢查花 3 秒
tokio::time::sleep(Duration::from_secs(3)).await;
let condition = true; // 可以換成 false 試試不送的狀況
process_risk_check(tx, condition);
}).await.unwrap(); // 等待風控執行完
}
// 外部邏輯做條件判斷
fn process_risk_check(tx: oneshot::Sender<&'static str>, condition: bool) {
if condition {
let _ = tx.send("開始撮合");
} else {
println!("風控未通過,不送出撮合指令");
// 如果沒 send,撮合 task 就會一直等住
}
}
在這裡 match 需要寫在 spawn 外面 主程式的其他 function 才可以正常執行
如果將上面改為 false 執行 結果如同底下
就像是使用不同裝置下單,是多對一的狀態
範例如下
use tokio::sync::mpsc;
use std::time::Duration;
#[tokio::main]
async fn main() {
// 建立 channel,容量為 10
let (tx, mut rx) = mpsc::channel::<String>(10);
// 撮合引擎 task:負責接收所有訂單
tokio::spawn(async move {
while let Some(order) = rx.recv().await {
println!("撮合引擎收到訂單:{}", order);
}
});
// 模擬三個下單來源:API、手機、網頁
let tx_api = tx.clone();
let tx_mobile = tx.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;
let _ = tx_api.send("API 下單: 買 BTC 1 張".to_string()).await;
});
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;
let _ = tx_mobile.send("手機下單: 賣 ETH 2 張".to_string()).await;
});
tokio::time::sleep(Duration::from_secs(1)).await;
let _ = tx.send("網頁下單: 買 DOGE 100 張".to_string()).await;
}
實際執行後的範例如下
可以看到接受的順序是未知的
一對多的傳送訊息
這裡舉一個跨檔案的例子
use tokio::sync::broadcast;
use tokio::time::{sleep, Duration};
pub async fn run_ui_listener(name: &str, mut rx: broadcast::Receiver<&'static str>) {
let name = name.to_string();
tokio::spawn(async move {
while let Ok(msg) = rx.recv().await {
println!("[{}] 收到交易所通知:{}", name, msg);
}
});
// 模擬 UI 正在運行
sleep(Duration::from_secs(1)).await;
}
mod ui;
use tokio::sync::broadcast;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let (tx, _) = broadcast::channel::<&'static str>(16);
// 啟動三個 UI 接收端
ui::run_ui_listener("Web UI", tx.subscribe()).await;
ui::run_ui_listener("Mobile App", tx.subscribe()).await;
ui::run_ui_listener("API Client", tx.subscribe()).await;
// 等待所有訂閱者啟動
sleep(Duration::from_secs(1)).await;
// 廣播一條系統通知
let _ = tx.send("系統將於 12:00 暫停交易");
// 等待訊息印出
sleep(Duration::from_secs(1)).await;
}
實際的輸出結果就像
同樣是一對多,但和 broadcast 不一樣的是 watch 永遠只保存一個數據,所以只有最新的狀態
範例如下
use tokio::sync::watch;
use tokio::time::{sleep, Duration};
pub async fn run_component(name: &str, mut rx: watch::Receiver<&'static str>) {
let name = name.to_string();
tokio::spawn(async move {
loop {
// 等待狀態改變
if rx.changed().await.is_ok() {
let state = *rx.borrow();
println!("[{}] 市場狀態更新為:{}", name, state);
// 模擬收到關閉就停止操作
if state == "closed" {
println!("[{}] 停止操作", name);
break;
}
}
}
});
}
mod module;
use tokio::sync::watch;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
// 建立 watch channel,初始狀態為「尚未開盤」
let (tx, rx) = watch::channel("pending");
// 啟動三個模組去監聽市場狀態
module::run_component("前台 Web", rx.clone()).await;
module::run_component("撮合引擎", rx.clone()).await;
module::run_component("API Gateway", rx.clone()).await;
// 模擬一秒後開盤
sleep(Duration::from_secs(1)).await;
let _ = tx.send("open");
// 模擬再過兩秒收盤
sleep(Duration::from_secs(2)).await;
let _ = tx.send("closed");
// 等一點時間讓所有模組印出
sleep(Duration::from_secs(2)).await;
}
實際的運行結果如下